/* -------------------------------------------------------------------------
 *
 * tuplestore.c
 *	  Generalized routines for temporary tuple storage.
 *
 * This module handles temporary storage of tuples for purposes such
 * as Materialize nodes, hashjoin batch files, etc.  It is essentially
 * a dumbed-down version of tuplesort.c; it does no sorting of tuples
 * but can only store and regurgitate a sequence of tuples.  However,
 * because no sort is required, it is allowed to start reading the sequence
 * before it has all been written.	This is particularly useful for cursors,
 * because it allows random access within the already-scanned portion of
 * a query without having to process the underlying scan to completion.
 * Also, it is possible to support multiple independent read pointers.
 *
 * A temporary file is used to handle the data if it exceeds the
 * space limit specified by the caller.
 *
 * The (approximate) amount of memory allowed to the tuplestore is specified
 * in kilobytes by the caller.	We absorb tuples and simply store them in an
 * in-memory array as long as we haven't exceeded maxKBytes.  If we do exceed
 * maxKBytes, we dump all the tuples into a temp file and then read from that
 * when needed.
 *
 * Upon creation, a tuplestore supports a single read pointer, numbered 0.
 * Additional read pointers can be created using tuplestore_alloc_read_pointer.
 * Mark/restore behavior is supported by copying read pointers.
 *
 * When the caller requests backward-scan capability, we write the temp file
 * in a format that allows either forward or backward scan.  Otherwise, only
 * forward scan is allowed.  A request for backward scan must be made before
 * putting any tuples into the tuplestore.	Rewind is normally allowed but
 * can be turned off via tuplestore_set_eflags; turning off rewind for all
 * read pointers enables truncation of the tuplestore at the oldest read point
 * for minimal memory usage.  (The caller must explicitly call tuplestore_trim
 * at appropriate times for truncation to actually happen.)
 *
 * Note: in TSS_WRITEFILE state, the temp file's seek position is the
 * current write position, and the write-position variables in the tuplestore
 * aren't kept up to date.  Similarly, in TSS_READFILE state the temp file's
 * seek position is the active read pointer's position, and that read pointer
 * isn't kept up to date.  We update the appropriate variables using ftell()
 * before switching to the other state or activating a different read pointer.
 *
 *
 * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * IDENTIFICATION
 *	  src/backend/utils/sort/tuplestore.c
 *
 * -------------------------------------------------------------------------
 */

#include "postgres.h"
#include "knl/knl_variable.h"

#include "access/tableam.h"
#include "commands/tablespace.h"
#include "executor/executor.h"
#include "pgstat.h"
#include "storage/buf/buffile.h"
#include "utils/memprot.h"
#include "utils/memutils.h"
#include "utils/resowner.h"
#ifdef USE_SPQ
#include "executor/node/nodeShareInputScan.h"
#endif

/*
 * Possible states of a Tuplestore object.	These denote the states that
 * persist between calls of Tuplestore routines.
 */
typedef enum {
    TSS_INMEM,     /* Tuples still fit in memory */
    TSS_WRITEFILE, /* Writing to temp file */
    TSS_READFILE   /* Reading from temp file */
} TupStoreStatus;

/*
 * State for a single read pointer.  If we are in state INMEM then all the
 * read pointers' "current" fields denote the read positions.  In state
 * WRITEFILE, the file/offset fields denote the read positions.  In state
 * READFILE, inactive read pointers have valid file/offset, but the active
 * read pointer implicitly has position equal to the temp file's seek position.
 *
 * Special case: if eof_reached is true, then the pointer's read position is
 * implicitly equal to the write position, and current/file/offset aren't
 * maintained.	This way we need not update all the read pointers each time
 * we write.
 */
typedef struct {
    int eflags;       /* capability flags */
    bool eof_reached; /* read has reached EOF */
    int current;      /* next array index to read */
    int file;         /* temp file# */
    off_t offset;     /* byte offset in file */
} TSReadPointer;

#ifdef USE_SPQ
typedef enum {
    TSHARE_NOT_SHARED,
    TSHARE_WRITER,
    TSHARE_READER
} TSSharedStatus;
#endif
 
/*
 * Private state of a Tuplestore operation.
 */
struct Tuplestorestate {
    TupStoreStatus status;  /* enumerated value as shown above */
    int eflags;             /* capability flags (OR of pointers' flags) */
    bool backward;          /* store extra length words in file? */
    bool interXact;         /* keep open through transactions? */
    bool truncated;         /* tuplestore_trim has removed tuples? */
    int64 availMem;         /* remaining memory available, in bytes */
    int64 allowedMem;       /* total memory allowed, in bytes */
    BufFile* myfile;        /* underlying file, or NULL if none */
    MemoryContext context;  /* memory context for holding tuples */
    ResourceOwner resowner; /* resowner for holding temp files */

    /*
     * These function pointers decouple the routines that must know what kind
     * of tuple we are handling from the routines that don't need to know it.
     * They are set up by the tuplestore_begin_xxx routines.
     *
     * (Although tuplestore.c currently only supports heap tuples, I've copied
     * this part of tuplesort.c so that extension to other kinds of objects
     * will be easy if it's ever needed.)
     *
     * Function to copy a supplied input tuple into palloc'd space. (NB: we
     * assume that a single pfree_ext() is enough to release the tuple later, so
     * the representation must be "flat" in one palloc chunk.) state->availMem
     * must be decreased by the amount of space used.
     */
    void* (*copytup)(Tuplestorestate* state, void* tup);

    /*
     * Function to write a stored tuple onto tape.	The representation of the
     * tuple on tape need not be the same as it is in memory; requirements on
     * the tape representation are given below.  After writing the tuple,
     * pfree_ext() it, and increase state->availMem by the amount of memory space
     * thereby released.
     */
    void (*writetup)(Tuplestorestate* state, void* tup);

    /*
     * Function to read a stored tuple from tape back into memory. 'len' is
     * the already-read length of the stored tuple.  Create and return a
     * palloc'd copy, and decrease state->availMem by the amount of memory
     * space consumed.
     */
    void* (*readtup)(Tuplestorestate* state, unsigned int len);

    /*
     * This array holds pointers to tuples in memory if we are in state INMEM.
     * In states WRITEFILE and READFILE it's not used.
     *
     * When memtupdeleted > 0, the first memtupdeleted pointers are already
     * released due to a tuplestore_trim() operation, but we haven't expended
     * the effort to slide the remaining pointers down.  These unused pointers
     * are set to NULL to catch any invalid accesses.  Note that memtupcount
     * includes the deleted pointers.
     */
    void** memtuples;   /* array of pointers to palloc'd tuples */
    int memtupdeleted;  /* the first N slots are currently unused */
    int memtupcount;    /* number of tuples currently present */
    int memtupsize;     /* allocated length of memtuples array */
    bool growmemtuples; /* memtuples' growth still underway? */

    /*
     * These variables are used to keep track of the current positions.
     *
     * In state WRITEFILE, the current file seek position is the write point;
     * in state READFILE, the write position is remembered in writepos_xxx.
     * (The write position is the same as EOF, but since BufFileSeek doesn't
     * currently implement SEEK_END, we have to remember it explicitly.)
     */
    TSReadPointer* readptrs; /* array of read pointers */
    int activeptr;           /* index of the active read pointer */
    int readptrcount;        /* number of pointers currently valid */
    int readptrsize;         /* allocated length of readptrs array */

    int writepos_file;     /* file# (valid if READFILE state) */
    off_t writepos_offset; /* offset (valid if READFILE state) */
    int64 width;           /* first records tuple count, second records total width */
    bool causedBySysRes;   /* the batch increase caused by system resources limit? */
    int64 maxMem;          /* auto spread max mem */
    int spreadNum;         /* auto spread times of memory */
    int planId;            /* id of plan that used this state */
    int dop;               /* parallel num of the plan */
    bool isMemCtl;         /* whether context is under memory control */
#ifdef USE_SPQ
    TSSharedStatus share_status;
    bool frozen;
    SharedFileSet *fileset;
    char *shared_filename;
#endif
};

#define COPYTUP(state, tup) ((*(state)->copytup)(state, tup))
#define WRITETUP(state, tup) ((*(state)->writetup)(state, tup))
#define READTUP(state, len) ((*(state)->readtup)(state, len))
#define USEMEM(state, amt) ((state)->availMem -= (amt))
#define FREEMEM(state, amt) ((state)->availMem += (amt))

/* Check if system in status of lacking memory */
static bool LACKMEM(Tuplestorestate* state)
{
    int64 usedMem = state->allowedMem - state->availMem;

    if (state->availMem < 0 || gs_sysmemory_busy(usedMem * state->dop, true))
        return true;

    return false;
}

/*
 * AutoSpreadMem:
 *	Memory auto spread logic. This is only happened when work mem
 *	threshold is met. If there's memory available in system, we
 *	want 2 times memory at most, as least as 10% memory, or failed.
 *
 * Parameters:
 *	@in state: tuple store state
 *	@out newmemtupsize: return allowed size of input tuple after spread
 *
 * Return:
 *	true if successful, or failed.
 */
static bool AutoSpreadMem(Tuplestorestate* state, double* growRatio)
{
    int64 usedMem = state->allowedMem - state->availMem;

    /* For concurrent case, we don't allow materialize to spread a lot */
    if (!gs_sysmemory_busy(usedMem * state->dop, true) && state->maxMem > state->allowedMem &&
        (state->spreadNum < 2 || g_instance.wlm_cxt->stat_manager.comp_count == 1)) {
        if (state->availMem < 0)
            state->allowedMem -= state->availMem;
        int64 spreadMem =
            Min(Min(dywlm_client_get_memory() * 1024L, state->allowedMem), state->maxMem - state->allowedMem);
        if (spreadMem > state->allowedMem * MEM_AUTO_SPREAD_MIN_RATIO) {
            *growRatio = Min(*growRatio, 1 + (double)spreadMem / state->allowedMem);
            state->allowedMem += spreadMem;
            state->availMem = spreadMem;
            state->spreadNum++;
            if (state->isMemCtl) {
                AllocSetContext* set = (AllocSetContext*)(state->context);
                set->maxSpaceSize += spreadMem;
            }
            MEMCTL_LOG(DEBUG2,
                "Materialize(%d) auto mem spread %ldKB succeed, and work mem is %ldKB.",
                state->planId,
                spreadMem / 1024L,
                state->allowedMem / 1024L);
            return true;
        }
        MEMCTL_LOG(LOG,
            "Materialize(%d) auto mem spread %ldKB failed, and work mem is %ldKB.",
            state->planId,
            spreadMem / 1024L,
            state->allowedMem / 1024L);
        if (state->spreadNum > 0) {
            pgstat_add_warning_spill_on_memory_spread();
        }
    }

    return false;
}

/* --------------------
 *
 * NOTES about on-tape representation of tuples:
 *
 * We require the first "unsigned int" of a stored tuple to be the total size
 * on-tape of the tuple, including itself (so it is never zero).
 * The remainder of the stored tuple
 * may or may not match the in-memory representation of the tuple ---
 * any conversion needed is the job of the writetup and readtup routines.
 *
 * If state->backward is true, then the stored representation of
 * the tuple must be followed by another "unsigned int" that is a copy of the
 * length --- so the total tape space used is actually sizeof(unsigned int)
 * more than the stored length value.  This allows read-backwards.	When
 * state->backward is not set, the write/read routines may omit the extra
 * length word.
 *
 * writetup is expected to write both length words as well as the tuple
 * data.  When readtup is called, the tape is positioned just after the
 * front length word; readtup must read the tuple data and advance past
 * the back length word (if present).
 *
 * The write/read routines can make use of the tuple description data
 * stored in the Tuplestorestate record, if needed. They are also expected
 * to adjust state->availMem by the amount of memory space (not tape space!)
 * released or consumed.  There is no error return from either writetup
 * or readtup; they should ereport() on failure.
 *
 *
 * NOTES about memory consumption calculations:
 *
 * We count space allocated for tuples against the maxKBytes limit,
 * plus the space used by the variable-size array memtuples.
 * Fixed-size space (primarily the BufFile I/O buffer) is not counted.
 * We don't worry about the size of the read pointer array, either.
 *
 * Note that we count actual space used (as shown by GetMemoryChunkSpace)
 * rather than the originally-requested size.  This is important since
 * palloc can add substantial overhead.  It's not a complete answer since
 * we won't count any wasted space in palloc allocation blocks, but it's
 * a lot better than what we were doing before 7.3.
 *
 * --------------------
 */

static Tuplestorestate* tuplestore_begin_common(int eflags, bool interXact, int maxKBytes);
static void tuplestore_puttuple_common(Tuplestorestate* state, void* tuple);
static void dumptuples(Tuplestorestate* state);
static unsigned int getlen(Tuplestorestate* state, bool eofOK);
static void* copytup_heap(Tuplestorestate* state, void* tup);
static void writetup_heap(Tuplestorestate* state, void* tup);
static void* readtup_heap(Tuplestorestate* state, unsigned int len);

/*
 *		tuplestore_begin_xxx
 *
 * Initialize for a tuple store operation.
 */
static Tuplestorestate* tuplestore_begin_common(int eflags, bool interXact, int maxKBytes)
{
    Tuplestorestate* state = NULL;

    state = (Tuplestorestate*)palloc0(sizeof(Tuplestorestate));

    state->status = TSS_INMEM;
    state->eflags = eflags;
    state->interXact = interXact;
    state->truncated = false;
    state->allowedMem = maxKBytes * 1024L;
    state->availMem = state->allowedMem;
    state->myfile = NULL;
    state->context = CurrentMemoryContext;
    state->resowner = t_thrd.utils_cxt.CurrentResourceOwner;

    state->memtupdeleted = 0;
    state->memtupcount = 0;
    state->memtupsize = 1024; /* initial guess */
    state->growmemtuples = true;
    state->memtuples = (void**)palloc(state->memtupsize * sizeof(void*));

    USEMEM(state, GetMemoryChunkSpace(state->memtuples));

    state->activeptr = 0;
    state->readptrcount = 1;
    state->readptrsize = 8; /* arbitrary */
    state->readptrs = (TSReadPointer*)palloc(state->readptrsize * sizeof(TSReadPointer));

    state->readptrs[0].eflags = eflags;
    state->readptrs[0].eof_reached = false;
    state->readptrs[0].current = 0;

    return state;
}

/*
 * tuplestore_begin_heap
 *
 * Create a new tuplestore; other types of tuple stores (other than
 * "heap" tuple stores, for heap tuples) are possible, but not presently
 * implemented.
 *
 * randomAccess: if true, both forward and backward accesses to the
 * tuple store are allowed.
 *
 * interXact: if true, the files used for on-disk storage persist beyond the
 * end of the current transaction.	NOTE: It's the caller's responsibility to
 * create such a tuplestore in a memory context and resource owner that will
 * also survive transaction boundaries, and to ensure the tuplestore is closed
 * when it's no longer wanted.
 *
 * maxKBytes: how much data to store in memory (any data beyond this
 * amount is paged to disk).  When in doubt, use u_sess->attr.attr_memory.work_mem.
 *
 * memSpread: if auto memory spread is allowed for this tuplestore
 * planId: id of plan that uses this state
 */
Tuplestorestate* tuplestore_begin_heap(
    bool randomAccess, bool interXact, int maxKBytes, int maxMem, int planId, int dop, bool isAddMemCtl)
{
    Tuplestorestate* state = NULL;
    int eflags;

    /*
     * This interpretation of the meaning of randomAccess is compatible with
     * the pre-8.3 behavior of tuplestores.
     */
    eflags = randomAccess ? (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND) : (EXEC_FLAG_REWIND);

    state = tuplestore_begin_common(eflags, interXact, maxKBytes);

    state->copytup = copytup_heap;
    state->writetup = writetup_heap;
    state->readtup = readtup_heap;

    state->width = 0;
    state->causedBySysRes = false;
    state->maxMem = maxMem * 1024L;
    state->spreadNum = 0;
    state->planId = planId;
    state->dop = dop;

    state->isMemCtl = isAddMemCtl;

    return state;
}

/*
 * tuplestore_set_eflags
 *
 * Set the capability flags for read pointer 0 at a finer grain than is
 * allowed by tuplestore_begin_xxx.  This must be called before inserting
 * any data into the tuplestore.
 *
 * eflags is a bitmask following the meanings used for executor node
 * startup flags (see executor.h).	tuplestore pays attention to these bits:
 *		EXEC_FLAG_REWIND		need rewind to start
 *		EXEC_FLAG_BACKWARD		need backward fetch
 * If tuplestore_set_eflags is not called, REWIND is allowed, and BACKWARD
 * is set per "randomAccess" in the tuplestore_begin_xxx call.
 *
 * NOTE: setting BACKWARD without REWIND means the pointer can read backwards,
 * but not further than the truncation point (the furthest-back read pointer
 * position at the time of the last tuplestore_trim call).
 */
void tuplestore_set_eflags(Tuplestorestate* state, int eflags)
{
    int i;

    if (state->status != TSS_INMEM || state->memtupcount != 0) {
        ereport(ERROR,
            (errmodule(MOD_EXECUTOR),
            (errcode(ERRCODE_CHECK_VIOLATION), errmsg("too late to call tuplestore_set_eflags"))));
    }        

    state->readptrs[0].eflags = eflags;
    if (state->readptrs[0].eflags < 0) {
        ereport(ERROR, (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), 
                        errmsg("number need be a positive number in bitwise operation")));
    }
    for (i = 1; i < state->readptrcount; i++) {
        if (state->readptrs[i].eflags < 0) {
            ereport(ERROR, (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), 
                            errmsg("number need be a positive number in bitwise operation")));
        }
        eflags = (unsigned int)eflags | (unsigned int)state->readptrs[i].eflags;
    }
    state->eflags = eflags;
}

/*
 * tuplestore_alloc_read_pointer - allocate another read pointer.
 *
 * Returns the pointer's index.
 *
 * The new pointer initially copies the position of read pointer 0.
 * It can have its own eflags, but if any data has been inserted into
 * the tuplestore, these eflags must not represent an increase in
 * requirements.
 */
int tuplestore_alloc_read_pointer(Tuplestorestate* state, int eflags)
{
    /* Check for possible increase of requirements */
    if (state->status != TSS_INMEM || state->memtupcount != 0) {
        if ((int)((unsigned int)state->eflags | (unsigned int)eflags) != state->eflags)
            ereport(ERROR,
                (errmodule(MOD_EXECUTOR),
                    (errcode(ERRCODE_CHECK_VIOLATION), errmsg("too late to require new tuplestore eflags"))));
    }

    /* Make room for another read pointer if needed */
    if (state->readptrcount >= state->readptrsize) {
        int newcnt = state->readptrsize * 2;

        state->readptrs = (TSReadPointer*)repalloc(state->readptrs, newcnt * sizeof(TSReadPointer));
        state->readptrsize = newcnt;
    }

    /* And set it up */
    state->readptrs[state->readptrcount] = state->readptrs[0];
    state->readptrs[state->readptrcount].eflags = eflags;

    if (state->eflags < 0)
        ereport(ERROR, (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
                        errmsg("number need be a positive number in bitwise operation")));
    if (eflags < 0)
        ereport(ERROR, (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
                        errmsg("number need be a positive number in bitwise operation")));
    state->eflags = (unsigned int)state->eflags | (unsigned int)eflags;

    return state->readptrcount++;
}

/*
 * tuplestore_clear
 *
 *	Delete all the contents of a tuplestore, and reset its read pointers
 *	to the start.
 */
void tuplestore_clear(Tuplestorestate* state)
{
    int i;
    TSReadPointer* readptr = NULL;

    if (state->myfile != NULL)
        BufFileClose(state->myfile);
    state->myfile = NULL;
    if (state->memtuples != NULL) {
        for (i = state->memtupdeleted; i < state->memtupcount; i++) {
            FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i]));
            pfree_ext(state->memtuples[i]);
        }
    }
    state->status = TSS_INMEM;
    state->truncated = false;
    state->memtupdeleted = 0;
    state->memtupcount = 0;
    readptr = state->readptrs;
    for (i = 0; i < state->readptrcount; readptr++, i++) {
        readptr->eof_reached = false;
        readptr->current = 0;
    }
}

/*
 * tuplestore_end
 *
 *	Release resources and clean up.
 */
void tuplestore_end(Tuplestorestate* state)
{
    int i;
    if (state == NULL) {
        ereport(ERROR,
            (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                errmsg("Tuplestore end failed: invalid state.")));
    }

    if (state->myfile != NULL)
        BufFileClose(state->myfile);
    if (state->memtuples != NULL) {
        for (i = state->memtupdeleted; i < state->memtupcount; i++)
            pfree_ext(state->memtuples[i]);
        pfree_ext(state->memtuples);
    }

#ifdef USE_SPQ
    if (state->share_status == TSHARE_WRITER)
        BufFileDeleteShared(state->fileset, state->shared_filename);
    if (state->shared_filename)
        pfree(state->shared_filename);
#endif

    pfree_ext(state->readptrs);
    pfree_ext(state);
}

/*
 * tuplestore_select_read_pointer - make the specified read pointer active
 */
void tuplestore_select_read_pointer(Tuplestorestate* state, int ptr)
{
    TSReadPointer* readptr = NULL;
    TSReadPointer* oldptr = NULL;

    Assert(ptr >= 0 && ptr < state->readptrcount);

    /* No work if already active */
    if (ptr == state->activeptr) {
        return;
    }

    readptr = &state->readptrs[ptr];
    oldptr = &state->readptrs[state->activeptr];

    switch (state->status) {
        case TSS_INMEM:
        case TSS_WRITEFILE:
            /* no work */
            break;
        case TSS_READFILE:

            /*
             * First, save the current read position in the pointer about to
             * become inactive.
             */
            if (!oldptr->eof_reached)
                BufFileTell(state->myfile, &oldptr->file, &oldptr->offset);

            /*
             * We have to make the temp file's seek position equal to the
             * logical position of the new read pointer.  In eof_reached
             * state, that's the EOF, which we have available from the saved
             * write position.
             */
            if (readptr->eof_reached) {
                if (BufFileSeek(state->myfile, state->writepos_file, state->writepos_offset, SEEK_SET) != 0)
                    ereport(ERROR,
                        (errmodule(MOD_EXECUTOR),
                            (errcode(ERRCODE_FILE_READ_FAILED), errmsg("tuplestore seek failed"))));
            } else {
                if (BufFileSeek(state->myfile, readptr->file, readptr->offset, SEEK_SET) != 0)
                    ereport(ERROR,
                        (errmodule(MOD_EXECUTOR),
                            (errcode(ERRCODE_FILE_READ_FAILED), errmsg("tuplestore seek failed"))));
            }
            break;
        default:
            ereport(ERROR,
                (errmodule(MOD_EXECUTOR),
                    (errcode(ERRCODE_UNEXPECTED_NODE_STATE), errmsg("invalid tuplestore state"))));
            break;
    }

    state->activeptr = ptr;
}

/*
 * tuplestore_ateof
 *
 * Returns the active read pointer's eof_reached state.
 */
bool tuplestore_ateof(Tuplestorestate* state)
{
    return state->readptrs[state->activeptr].eof_reached;
}

/*
 * Grow the memtuples[] array, if possible within our memory constraint.
 * Return TRUE if we were able to enlarge the array, FALSE if not.
 *
 * Normally, at each increment we double the size of the array.  When we no
 * longer have enough memory to do that, we attempt one last, smaller increase
 * (and then clear the growmemtuples flag so we don't try any more).  That
 * allows us to use allowedMem as fully as possible; sticking to the pure
 * doubling rule could result in almost half of allowedMem going unused.
 * Because availMem moves around with tuple addition/removal, we need some
 * rule to prevent making repeated small increases in memtupsize, which would
 * just be useless thrashing.  The growmemtuples flag accomplishes that and
 * also prevents useless recalculations in this function.
 */
static bool grow_memtuples(Tuplestorestate* state)
{
    int newmemtupsize;
    int memtupsize = state->memtupsize;
    int64 memNowUsed = state->allowedMem - state->availMem;
    bool needAutoSpread = false;
    int64 growMemTotal = (state->allowedMem > (int64)MaxAllocSize) ? state->allowedMem : (int64)MaxAllocSize;
    double growRatio = Min(DEFAULT_GROW_RATIO, ((double)((double)growMemTotal / sizeof(void*)) / memtupsize));
    double unspreadGrowRatio = growRatio;

    /* Forget it if we've already maxed out memtuples, per comment above */
    if (!state->growmemtuples) {
        return false;
    }

    /* Select new value of memtupsize */
    if (memNowUsed <= state->availMem) {
        /*
         * It is surely safe to double memtupsize if we've used no more than
         * half of allowedMem.
         *
         * Note: it might seem that we need to worry about memtupsize * 2
         * overflowing an int, but the MaxAllocSize clamp applied below
         * ensures the existing memtupsize can't be large enough for that.
         */
        newmemtupsize = (int)(memtupsize * growRatio);
    } else {
        /*
         * This will be the last increment of memtupsize.  Abandon doubling
         * strategy and instead increase as much as we safely can.
         *
         * To stay within allowedMem, we can't increase memtupsize by more
         * than availMem / sizeof(void *) elements. In practice, we want
         * to increase it by considerably less, because we need to leave some
         * space for the tuples to which the new array slots will refer.  We
         * assume the new tuples will be about the same size as the tuples
         * we've already seen, and thus we can extrapolate from the space
         * consumption so far to estimate an appropriate new size for the
         * memtuples array.  The optimal value might be higher or lower than
         * this estimate, but it's hard to know that in advance.
         *
         * This calculation is safe against enlarging the array so much that
         * LACKMEM becomes true, because the memory currently used includes
         * the present array; thus, there would be enough allowedMem for the
         * new array elements even if no other memory were currently used.
         *
         * We do the arithmetic in float8, because otherwise the product of
         * memtupsize and allowedMem could overflow.  (A little algebra shows
         * that grow_ratio must be less than 2 here, so we are not risking
         * integer overflow this way.)  Any inaccuracy in the result should be
         * insignificant; but even if we computed a completely insane result,
         * the checks below will prevent anything really bad from happening.
         */
        unspreadGrowRatio = Min(unspreadGrowRatio, (double)state->allowedMem / (double)memNowUsed);
        newmemtupsize = (int)(memtupsize * unspreadGrowRatio);
    }

    /* Must enlarge array by at least one element, else report failure */
    if (newmemtupsize <= memtupsize) {
        needAutoSpread = true;
    } else {
        growRatio = unspreadGrowRatio;
    }

    /*
     * We need to be sure that we do not cause LACKMEM to become true, else
     * the space management algorithm will go nuts.  The code above should
     * never generate a dangerous request, but to be safe, check explicitly
     * that the array growth fits within availMem.  (We could still cause
     * LACKMEM if the memory chunk overhead associated with the memtuples
     * array were to increase.  That shouldn't happen with any sane value of
     * allowedMem, because at any array size large enough to risk LACKMEM,
     * palloc would be treating both old and new arrays as separate chunks.
     * But we'll check LACKMEM explicitly below just in case.)
     */
    if (state->availMem < (long)((newmemtupsize - memtupsize) * sizeof(void*)))
        needAutoSpread = true;

    if (gs_sysmemory_busy(memNowUsed * state->dop, true)) {
        MEMCTL_LOG(LOG,
            "Materialize(%d) early spilled, workmem: %ldKB, availmem: %ldKB",
            state->planId,
            state->allowedMem / 1024L,
            state->availMem / 1024L);
        pgstat_add_warning_early_spill();
        state->causedBySysRes = true;
        if (state->isMemCtl) {
            AllocSetContext* set = (AllocSetContext*)(state->context);
            set->maxSpaceSize = memNowUsed;
            state->allowedMem = memNowUsed;
        }
        goto noalloc;
    }

    if (needAutoSpread && !AutoSpreadMem(state, &growRatio)) {
        goto noalloc;
    }

    /* if there's no more space after grow, then fail */
    if (memtupsize * growRatio <= memtupsize) {
        MEMCTL_LOG(LOG, "Materialize(%d) mem limit reached", state->planId);
        goto noalloc;
    }

    if (!gs_sysmemory_avail((int64)(memNowUsed * (growRatio - 1)))) {
        MEMCTL_LOG(LOG,
            "Materialize(%d) mem lack, workmem: %ldKB, availmem: %ldKB,"
            "usedmem: %ldKB, grow ratio: %.2f",
            state->planId,
            state->allowedMem / 1024L,
            state->availMem / 1024L,
            memNowUsed / 1024L,
            growRatio);
        goto noalloc;
    }

    /* OK, do it */
    FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
    state->memtupsize = (int)(memtupsize * growRatio);
    state->memtuples = (void**)repalloc_huge(state->memtuples, state->memtupsize * sizeof(void*));
    USEMEM(state, GetMemoryChunkSpace(state->memtuples));
    if (state->availMem < 0)
        goto noalloc;

    return true;

noalloc:
    /* If for any reason we didn't realloc, shut off future attempts */
    state->growmemtuples = false;
    return false;
}

/*
 * Accept one tuple and append it to the tuplestore.
 *
 * Note that the input tuple is always copied; the caller need not save it.
 *
 * If the active read pointer is currently "at EOF", it remains so (the read
 * pointer implicitly advances along with the write pointer); otherwise the
 * read pointer is unchanged.  Non-active read pointers do not move, which
 * means they are certain to not be "at EOF" immediately after puttuple.
 * This curious-seeming behavior is for the convenience of nodeMaterial.c and
 * nodeCtescan.c, which would otherwise need to do extra pointer repositioning
 * steps.
 *
 * tuplestore_puttupleslot() is a convenience routine to collect data from
 * a TupleTableSlot without an extra copy operation.
 */
void tuplestore_puttupleslot(Tuplestorestate* state, TupleTableSlot* slot, bool need_transform_anyarray)
{
    MinimalTuple tuple;
    MemoryContext oldcxt = MemoryContextSwitchTo(state->context);

    /*
     * Form a MinimalTuple in working memory
     */
    tuple = ExecCopySlotMinimalTuple(slot, need_transform_anyarray);
    if (tuplestore_in_memory(state))
        state->width += tuple->t_len;
    USEMEM(state, GetMemoryChunkSpace(tuple));

    tuplestore_puttuple_common(state, (void*)tuple);

    (void)MemoryContextSwitchTo(oldcxt);
}

/*
 * "Standard" case to copy from a HeapTuple.  This is actually now somewhat
 * deprecated, but not worth getting rid of in view of the number of callers.
 */
void tuplestore_puttuple(Tuplestorestate* state, HeapTuple tuple)
{
    MemoryContext oldcxt = MemoryContextSwitchTo(state->context);

    /*
     * Copy the tuple.	(Must do this even in WRITEFILE case.  Note that
     * COPYTUP includes USEMEM, so we needn't do that here.)
     */
    tuple = (HeapTuple)COPYTUP(state, tuple);

    tuplestore_puttuple_common(state, (void*)tuple);

    (void)MemoryContextSwitchTo(oldcxt);
}

/*
 * Similar to tuplestore_puttuple(), but work from values + nulls arrays.
 * This avoids an extra tuple-construction operation.
 */
void tuplestore_putvalues(Tuplestorestate* state, TupleDesc tdesc, Datum* values, const bool* isnull)
{
    MinimalTuple tuple;
    MemoryContext oldcxt = MemoryContextSwitchTo(state->context);

    tuple = tableam_tops_form_minimal_tuple(tdesc, values, isnull, NULL);
    if (tuplestore_in_memory(state))
        state->width += tuple->t_len;
    USEMEM(state, GetMemoryChunkSpace(tuple));

    tuplestore_puttuple_common(state, (void*)tuple);

    (void)MemoryContextSwitchTo(oldcxt);
}

static void tuplestore_puttuple_common(Tuplestorestate* state, void* tuple)
{
    TSReadPointer* readptr = NULL;
    int i;
    ResourceOwner oldowner;

    switch (state->status) {
        case TSS_INMEM:

            /*
             * Update read pointers as needed; see API spec above.
             */
            readptr = state->readptrs;
            for (i = 0; i < state->readptrcount; readptr++, i++) {
                if (readptr->eof_reached && i != state->activeptr) {
                    readptr->eof_reached = false;
                    readptr->current = state->memtupcount;
                }
            }

            /*
             * Grow the array as needed.  Note that we try to grow the array
             * when there is still one free slot remaining --- if we fail,
             * there'll still be room to store the incoming tuple, and then
             * we'll switch to tape-based operation.
             */
            if (state->memtupcount >= state->memtupsize - 1 || state->availMem <= 0) {
                /* if mem is used up, then adjust tupsize */
                if (state->availMem <= 0)
                    state->memtupsize = state->memtupcount + 1;
                (void)grow_memtuples(state);
                Assert(state->memtupcount < state->memtupsize);
            }

            /* Stash the tuple in the in-memory array */
            state->memtuples[state->memtupcount++] = tuple;

            /*
             * Done if we still fit in available memory and have array slots.
             */
            if (state->memtupcount < state->memtupsize && !LACKMEM(state)) {
                return;
            }

            if (state->memtupcount > 0)
                state->width = state->width / state->memtupcount;
            if (LACKMEM(state)) {
                if (state->isMemCtl) {
                    AllocSetContext* set = (AllocSetContext*)(state->context);
                    int64 usedMem = state->allowedMem - state->availMem;
                    set->maxSpaceSize = usedMem;
                    state->allowedMem = usedMem;
                }
                elog(LOG,
                    "Materialize lacks mem, workmem: %ldKB, availmem: %ldKB",
                    state->allowedMem / 1024L,
                    state->availMem / 1024L);
            }

            /*
             * Nope; time to switch to tape-based operation.  Make sure that
             * the temp file(s) are created in suitable temp tablespaces.
             */
            PrepareTempTablespaces();

            /* associate the file with the store's resource owner */
            oldowner = t_thrd.utils_cxt.CurrentResourceOwner;
            t_thrd.utils_cxt.CurrentResourceOwner = state->resowner;

            state->myfile = BufFileCreateTemp(state->interXact);

            t_thrd.utils_cxt.CurrentResourceOwner = oldowner;

            /*
             * Freeze the decision about whether trailing length words will be
             * used.  We can't change this choice once data is on tape, even
             * though callers might drop the requirement.
             */
            state->backward = (state->eflags & EXEC_FLAG_BACKWARD) != 0;
            state->status = TSS_WRITEFILE;
            dumptuples(state);
            break;
        case TSS_WRITEFILE:

            /*
             * Update read pointers as needed; see API spec above. Note:
             * BufFileTell is quite cheap, so not worth trying to avoid
             * multiple calls.
             */
            readptr = state->readptrs;
            for (i = 0; i < state->readptrcount; readptr++, i++) {
                if (readptr->eof_reached && i != state->activeptr) {
                    readptr->eof_reached = false;
                    BufFileTell(state->myfile, &readptr->file, &readptr->offset);
                }
            }

            WRITETUP(state, tuple);
            break;
        case TSS_READFILE:

            /*
             * Switch from reading to writing.
             */
            if (state->readptrs[state->activeptr].eof_reached == false)
                BufFileTell(
                    state->myfile, &state->readptrs[state->activeptr].file, &state->readptrs[state->activeptr].offset);
            if (BufFileSeek(state->myfile, state->writepos_file, state->writepos_offset, SEEK_SET) != 0)
                ereport(ERROR,
                    (errmodule(MOD_EXECUTOR),
                        (errcode(ERRCODE_FILE_READ_FAILED), errmsg("tuplestore seek to EOF failed"))));
            state->status = TSS_WRITEFILE;

            /*
             * Update read pointers as needed; see API spec above.
             */
            readptr = state->readptrs;
            for (i = 0; i < state->readptrcount; readptr++, i++) {
                if (readptr->eof_reached && i != state->activeptr) {
                    readptr->eof_reached = false;
                    readptr->file = state->writepos_file;
                    readptr->offset = state->writepos_offset;
                }
            }

            WRITETUP(state, tuple);
            break;
        default:
            ereport(ERROR,
                (errmodule(MOD_EXECUTOR),
                    (errcode(ERRCODE_UNEXPECTED_NODE_STATE), errmsg("invalid tuplestore state"))));
            break;
    }
}

/*
 * Fetch the next tuple in either forward or back direction.
 * Returns NULL if no more tuples.	If should_free is set, the
 * caller must pfree the returned tuple when done with it.
 *
 * Backward scan is only allowed if randomAccess was set true or
 * EXEC_FLAG_BACKWARD was specified to tuplestore_set_eflags().
 */
static void* tuplestore_gettuple(Tuplestorestate* state, bool forward, bool* should_free)
{
    TSReadPointer* readptr = &state->readptrs[state->activeptr];
    unsigned int tuplen;
    void* tup = NULL;

    Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD));

    switch (state->status) {
        case TSS_INMEM:
            *should_free = false;
            if (forward) {
                if (readptr->eof_reached)
                    return NULL;
                if (readptr->current < state->memtupcount) {
                    /* We have another tuple, so return it */
                    return state->memtuples[readptr->current++];
                }
                readptr->eof_reached = true;
                return NULL;
            } else {
                /*
                 * if all tuples are fetched already then we return last
                 * tuple, else tuple before last returned.
                 */
                if (readptr->eof_reached) {
                    readptr->current = state->memtupcount;
                    readptr->eof_reached = false;
                } else {
                    if (readptr->current <= state->memtupdeleted) {
                        Assert(!state->truncated);
                        return NULL;
                    }
                    readptr->current--; /* last returned tuple */
                }
                if (readptr->current <= state->memtupdeleted) {
                    Assert(!state->truncated);
                    return NULL;
                }
                return state->memtuples[readptr->current - 1];
            }
            break;

        case TSS_WRITEFILE:
            /* Skip state change if we'll just return NULL */
            if (readptr->eof_reached && forward)
                return NULL;

            /*
             * Switch from writing to reading.
             */
            BufFileTell(state->myfile, &state->writepos_file, &state->writepos_offset);
            if (!readptr->eof_reached)
                if (BufFileSeek(state->myfile, readptr->file, readptr->offset, SEEK_SET) != 0)
                    ereport(ERROR,
                        (errmodule(MOD_EXECUTOR),
                            (errcode(ERRCODE_FILE_READ_FAILED), errmsg("tuplestore seek failed"))));
            state->status = TSS_READFILE;
            /* FALL THRU into READFILE case */

        case TSS_READFILE:
            *should_free = true;
            if (forward) {
                if ((tuplen = getlen(state, true)) != 0) {
                    tup = READTUP(state, tuplen);
                    return tup;
                } else {
                    readptr->eof_reached = true;
                    return NULL;
                }
            }

            /*
             * Backward.
             *
             * if all tuples are fetched already then we return last tuple,
             * else tuple before last returned.
             *
             * Back up to fetch previously-returned tuple's ending length
             * word. If seek fails, assume we are at start of file.
             */
            if (BufFileSeek(state->myfile, 0, -(long)sizeof(unsigned int), SEEK_CUR) != 0) {
                /* even a failed backwards fetch gets you out of eof state */
                readptr->eof_reached = false;
                Assert(!state->truncated);
                return NULL;
            }
            tuplen = getlen(state, false);

            if (readptr->eof_reached) {
                readptr->eof_reached = false;
                /* We will return the tuple returned before returning NULL */
            } else {
                /*
                 * Back up to get ending length word of tuple before it.
                 */
                if (BufFileSeek(state->myfile, 0, -(long)(tuplen + 2 * sizeof(unsigned int)), SEEK_CUR) != 0) {
                    /*
                     * If that fails, presumably the prev tuple is the first
                     * in the file.  Back up so that it becomes next to read
                     * in forward direction (not obviously right, but that is
                     * what in-memory case does).
                     */
                    if (BufFileSeek(state->myfile, 0, -(long)(tuplen + sizeof(unsigned int)), SEEK_CUR) != 0)
                        ereport(ERROR,
                            (errmodule(MOD_EXECUTOR),
                                (errcode(ERRCODE_FILE_READ_FAILED), errmsg("bogus tuple length in backward scan"))));
                    Assert(!state->truncated);
                    return NULL;
                }
                tuplen = getlen(state, false);
            }

            /*
             * Now we have the length of the prior tuple, back up and read it.
             * Note: READTUP expects we are positioned after the initial
             * length word of the tuple, so back up to that point.
             */
            if (BufFileSeek(state->myfile, 0, -(long)tuplen, SEEK_CUR) != 0)
                ereport(ERROR,
                    (errmodule(MOD_EXECUTOR),
                        (errcode(ERRCODE_FILE_READ_FAILED), errmsg("bogus tuple length in backward scan"))));
            tup = READTUP(state, tuplen);
            return tup;

        default:
            ereport(ERROR,
                (errmodule(MOD_EXECUTOR),
                    (errcode(ERRCODE_UNEXPECTED_NODE_STATE), errmsg("invalid tuplestore state"))));
            return NULL; /* keep compiler quiet */
    }
}

/*
 * tuplestore_gettupleslot - exported function to fetch a MinimalTuple
 *
 * If successful, put tuple in slot and return TRUE; else, clear the slot
 * and return FALSE.
 *
 * If copy is TRUE, the slot receives a copied tuple (allocated in current
 * memory context) that will stay valid regardless of future manipulations of
 * the tuplestore's state.  If copy is FALSE, the slot may just receive a
 * pointer to a tuple held within the tuplestore.  The latter is more
 * efficient but the slot contents may be corrupted if additional writes to
 * the tuplestore occur.  (If using tuplestore_trim, see comments therein.)
 */
bool tuplestore_gettupleslot(Tuplestorestate* state, bool forward, bool copy, TupleTableSlot* slot)
{
    MinimalTuple tuple;
    bool should_free = false;

    tuple = (MinimalTuple)tuplestore_gettuple(state, forward, &should_free);

    if (tuple) {
        if (copy && !should_free) {
            tuple = tableam_tops_copy_minimal_tuple(tuple);
            should_free = true;
        }
        ExecStoreMinimalTuple(tuple, slot, should_free);
        return true;
    } else {
        (void)ExecClearTuple(slot);
        return false;
    }
}

/*
 * tuplestore_advance - exported function to adjust position without fetching
 *
 * We could optimize this case to avoid palloc/pfree overhead, but for the
 * moment it doesn't seem worthwhile.  (XXX this probably needs to be
 * reconsidered given the needs of window functions.)
 */
bool tuplestore_advance(Tuplestorestate* state, bool forward)
{
    void* tuple = NULL;
    bool should_free = false;

    tuple = tuplestore_gettuple(state, forward, &should_free);

    if (tuple != NULL) {
        if (should_free)
            pfree_ext(tuple);
        return true;
    } else {
        return false;
    }
}

/*
 * dumptuples - remove tuples from memory and write to tape
 *
 * As a side effect, we must convert each read pointer's position from
 * "current" to file/offset format.  But eof_reached pointers don't
 * need to change state.
 */
static void dumptuples(Tuplestorestate* state)
{
    int i;

    for (i = state->memtupdeleted;; i++) {
        TSReadPointer* readptr = state->readptrs;
        int j;

        for (j = 0; j < state->readptrcount; readptr++, j++) {
            if (i == readptr->current && !readptr->eof_reached)
                BufFileTell(state->myfile, &readptr->file, &readptr->offset);
        }
        if (i >= state->memtupcount) {
            break;
        }
        WRITETUP(state, state->memtuples[i]);
    }
    state->memtupdeleted = 0;
    state->memtupcount = 0;
}

/*
 * tuplestore_rescan		- rewind the active read pointer to start
 */
void tuplestore_rescan(Tuplestorestate* state)
{
    TSReadPointer* readptr = &state->readptrs[state->activeptr];

    Assert(readptr->eflags & EXEC_FLAG_REWIND);
    Assert(!state->truncated);

    switch (state->status) {
        case TSS_INMEM:
            readptr->eof_reached = false;
            readptr->current = 0;
            break;
        case TSS_WRITEFILE:
            readptr->eof_reached = false;
            readptr->file = 0;
            readptr->offset = 0L;
            break;
        case TSS_READFILE:
            readptr->eof_reached = false;
            if (BufFileSeek(state->myfile, 0, 0L, SEEK_SET) != 0)
                ereport(ERROR,
                    (errmodule(MOD_EXECUTOR),
                        (errcode(ERRCODE_FILE_READ_FAILED), errmsg("tuplestore seek to start failed"))));
            break;
        default:
            ereport(ERROR,
                (errmodule(MOD_EXECUTOR),
                    (errcode(ERRCODE_UNEXPECTED_NODE_STATE), errmsg("invalid tuplestore state"))));
            break;
    }
}

/*
 * tuplestore_copy_read_pointer - copy a read pointer's state to another
 */
void tuplestore_copy_read_pointer(Tuplestorestate* state, int srcptr, int destptr)
{
    TSReadPointer* sptr = &state->readptrs[srcptr];
    TSReadPointer* dptr = &state->readptrs[destptr];

    Assert(srcptr >= 0 && srcptr < state->readptrcount);
    Assert(destptr >= 0 && destptr < state->readptrcount);

    /* Assigning to self is a no-op */
    if (srcptr == destptr) {
        return;
    }

    if (dptr->eflags != sptr->eflags) {
        /* Possible change of overall eflags, so copy and then recompute */
        int eflags;
        int i;

        *dptr = *sptr;
        eflags = state->readptrs[0].eflags;
        if (eflags < 0)
            ereport(ERROR, (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
                            errmsg("number need be a positive number in bitwise operation")));
        for (i = 1; i < state->readptrcount; i++) {
            if (state->readptrs[i].eflags < 0)
                ereport(ERROR, (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
                                errmsg("number need be a positive number in bitwise operation")));
            eflags = (unsigned int)eflags | (unsigned int)state->readptrs[i].eflags;
        }
        state->eflags = eflags;
    } else
        *dptr = *sptr;

    switch (state->status) {
        case TSS_INMEM:
        case TSS_WRITEFILE:
            /* no work */
            break;
        case TSS_READFILE:

            /*
             * This case is a bit tricky since the active read pointer's
             * position corresponds to the seek point, not what is in its
             * variables.  Assigning to the active requires a seek, and
             * assigning from the active requires a tell, except when
             * eof_reached.
             */
            if (destptr == state->activeptr) {
                if (dptr->eof_reached) {
                    if (BufFileSeek(state->myfile, state->writepos_file, state->writepos_offset, SEEK_SET) != 0)
                        ereport(ERROR,
                            (errmodule(MOD_EXECUTOR),
                                (errcode(ERRCODE_FILE_READ_FAILED), errmsg("tuplestore seek failed"))));
                } else {
                    if (BufFileSeek(state->myfile, dptr->file, dptr->offset, SEEK_SET) != 0)
                        ereport(ERROR,
                            (errmodule(MOD_EXECUTOR),
                                (errcode(ERRCODE_FILE_READ_FAILED), errmsg("tuplestore seek failed"))));
                }
            } else if (srcptr == state->activeptr) {
                if (!dptr->eof_reached)
                    BufFileTell(state->myfile, &dptr->file, &dptr->offset);
            }
            break;
        default:
            ereport(ERROR,
                (errmodule(MOD_EXECUTOR),
                    (errcode(ERRCODE_UNEXPECTED_NODE_STATE), errmsg("invalid tuplestore state"))));
            break;
    }
}

/*
 * tuplestore_trim	- remove all no-longer-needed tuples
 *
 * Calling this function authorizes the tuplestore to delete all tuples
 * before the oldest read pointer, if no read pointer is marked as requiring
 * REWIND capability.
 *
 * Note: this is obviously safe if no pointer has BACKWARD capability either.
 * If a pointer is marked as BACKWARD but not REWIND capable, it means that
 * the pointer can be moved backward but not before the oldest other read
 * pointer.
 */
void tuplestore_trim(Tuplestorestate* state)
{
    int oldest;
    int nremove;
    int i;

    /*
     * Truncation is disallowed if any read pointer requires rewind
     * capability.
     */
    if (state->eflags & EXEC_FLAG_REWIND)
        return;

    /*
     * We don't bother trimming temp files since it usually would mean more
     * work than just letting them sit in kernel buffers until they age out.
     */
    if (state->status != TSS_INMEM)
        return;

    /* Find the oldest read pointer */
    oldest = state->memtupcount;
    for (i = 0; i < state->readptrcount; i++) {
        if (state->readptrs[i].eof_reached == false)
            oldest = Min(oldest, state->readptrs[i].current);
    }

    /*
     * Note: you might think we could remove all the tuples before the oldest
     * "current", since that one is the next to be returned.  However, since
     * tuplestore_gettuple returns a direct pointer to our internal copy of
     * the tuple, it's likely that the caller has still got the tuple just
     * before "current" referenced in a slot. So we keep one extra tuple
     * before the oldest "current".  (Strictly speaking, we could require such
     * callers to use the "copy" flag to tuplestore_gettupleslot, but for
     * efficiency we allow this one case to not use "copy".)
     */
    nremove = oldest - 1;
    if (nremove <= 0) {
        return; /* nothing to do */
    }
    Assert(nremove >= state->memtupdeleted);
    Assert(nremove <= state->memtupcount);

    /* Release no-longer-needed tuples */
    for (i = state->memtupdeleted; i < nremove; i++) {
        FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i]));
        pfree_ext(state->memtuples[i]);
        state->memtuples[i] = NULL;
    }
    state->memtupdeleted = nremove;

    /* mark tuplestore as truncated (used for Assert crosschecks only) */
    state->truncated = true;

    /*
     * If nremove is less than 1/8th memtupcount, just stop here, leaving the
     * "deleted" slots as NULL.  This prevents us from expending O(N^2) time
     * repeatedly memmove-ing a large pointer array.  The worst case space
     * wastage is pretty small, since it's just pointers and not whole tuples.
     */
    if (nremove < state->memtupcount / 8) {
        return;
    }
    /*
     * Slide the array down and readjust pointers.
     *
     * In mergejoin's current usage, it's demonstrable that there will always
     * be exactly one non-removed tuple; so optimize that case.
     */
    if (nremove + 1 == state->memtupcount) {
        state->memtuples[0] = state->memtuples[nremove];
    } else {
        int msize = (state->memtupcount - nremove) * sizeof(void*);
        errno_t rc = memmove_s(state->memtuples, msize, state->memtuples + nremove, msize);
        securec_check(rc, "\0", "\0");
    }

    state->memtupdeleted = 0;
    state->memtupcount -= nremove;
    for (i = 0; i < state->readptrcount; i++) {
        if (state->readptrs[i].eof_reached == false)
            state->readptrs[i].current -= nremove;
    }
}

/*
 * tuplestore_in_memory
 *
 * Returns true if the tuplestore has not spilled to disk.
 *
 * XXX exposing this is a violation of modularity ... should get rid of it.
 */
bool tuplestore_in_memory(Tuplestorestate* state)
{
    return (state->status == TSS_INMEM);
}

/*
 * Tape interface routines
 */

static unsigned int getlen(Tuplestorestate* state, bool eofOK)
{
    unsigned int len = 0;
    size_t nbytes;

    nbytes = BufFileRead(state->myfile, (void*)&len, sizeof(len));
    if (nbytes == sizeof(len))
        return len;
    if (nbytes != 0)
        ereport(
            ERROR, (errmodule(MOD_EXECUTOR), (errcode(ERRCODE_FILE_READ_FAILED), errmsg("unexpected end of tape"))));
    if (!eofOK)
        ereport(
            ERROR, (errmodule(MOD_EXECUTOR), (errcode(ERRCODE_FILE_READ_FAILED), errmsg("unexpected end of data"))));
    return 0;
}

/*
 * Routines specialized for HeapTuple case
 *
 * The stored form is actually a MinimalTuple, but for largely historical
 * reasons we allow COPYTUP to work from a HeapTuple.
 *
 * Since MinimalTuple already has length in its first word, we don't need
 * to write that separately.
 */

static void* copytup_heap(Tuplestorestate* state, void* tup)
{
    MinimalTuple tuple;

    Assert(!HEAP_TUPLE_IS_COMPRESSED(((HeapTuple)tup)->t_data));
    tuple = minimal_tuple_from_heap_tuple((HeapTuple)tup);
    if (tuplestore_in_memory(state))
        state->width += tuple->t_len;
    USEMEM(state, GetMemoryChunkSpace(tuple));
    return (void*)tuple;
}

static void writetup_heap(Tuplestorestate* state, void* tup)
{
    MinimalTuple tuple = (MinimalTuple)tup;

    /* the part of the MinimalTuple we'll write: */
    char* tupbody = (char*)tuple + MINIMAL_TUPLE_DATA_OFFSET;
    unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET;

    /* total on-disk footprint: */
    unsigned int tuplen = tupbodylen + sizeof(int);

    if (BufFileWrite(state->myfile, (void*)&tuplen, sizeof(tuplen)) != sizeof(tuplen))
        ereport(ERROR, (errcode_for_file_access(), errmsg("could not write to tuplestore temporary file: %m")));

    if (BufFileWrite(state->myfile, (void*)tupbody, tupbodylen) != (size_t)tupbodylen)
        ereport(ERROR, (errcode_for_file_access(), errmsg("could not write to tuplestore temporary file: %m")));

    if (state->backward) /* need trailing length word? */
        if (BufFileWrite(state->myfile, (void*)&tuplen, sizeof(tuplen)) != sizeof(tuplen))
            ereport(ERROR, (errcode_for_file_access(), errmsg("could not write to tuplestore temporary file: %m")));

    FREEMEM(state, GetMemoryChunkSpace(tuple));
    heap_free_minimal_tuple(tuple);
    tuple = NULL;
}

static void* readtup_heap(Tuplestorestate* state, unsigned int len)
{
    unsigned int tupbodylen = len - sizeof(int);
    unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
    MinimalTuple tuple = (MinimalTuple)palloc(tuplen);
    char* tupbody = (char*)tuple + MINIMAL_TUPLE_DATA_OFFSET;

    USEMEM(state, GetMemoryChunkSpace(tuple));
    /* read in the tuple proper */
    tuple->t_len = tuplen;
    if (BufFileRead(state->myfile, (void*)tupbody, tupbodylen) != (size_t)tupbodylen)
        ereport(
            ERROR, (errmodule(MOD_EXECUTOR), (errcode(ERRCODE_FILE_READ_FAILED), errmsg("unexpected end of data"))));
    if (state->backward) /* need trailing length word? */
        if (BufFileRead(state->myfile, (void*)&tuplen, sizeof(tuplen)) != sizeof(tuplen))
            ereport(ERROR,
                (errmodule(MOD_EXECUTOR), (errcode(ERRCODE_FILE_READ_FAILED), errmsg("unexpected end of data"))));
    return (void*)tuple;
}

/* routines to get variable of the state since it's static */
int64 tuplestore_get_avgwidth(Tuplestorestate* state)
{
    if (tuplestore_in_memory(state) && state->memtupcount > 0)
        state->width = state->width / state->memtupcount;
    return state->width;
}

bool tuplestore_get_busy_status(Tuplestorestate* state)
{
    return state->causedBySysRes;
}

int tuplestore_get_spread_num(Tuplestorestate* state)
{
    return state->spreadNum;
}

int tuplestore_get_memtupcount(Tuplestorestate* state)
{
    return state->memtupcount;
}

#ifdef USE_SPQ
/*
 * tuplestore_make_shared
 *
 * Make a tuplestore available for sharing later. This must be called
 * immediately after tuplestore_begin_heap().
 */
void tuplestore_make_shared(Tuplestorestate *state, SharedFileSet *fileset, const char *filename)
{   
    ResourceOwner oldowner;
    
    Assert(state->status == TSS_INMEM);
    Assert(state->share_status == TSHARE_NOT_SHARED);
    state->share_status = TSHARE_WRITER;
    state->fileset = fileset;
    state->shared_filename = pstrdup(filename);
    
    /*
     * Switch to tape-based operation, like in tuplestore_puttuple_common().
     * We could delay this until tuplestore_freeze(), but we know we'll have
     * to write everything to the file anyway, so let's not waste memory
     * buffering the tuples in the meanwhile.
     */
    PrepareTempTablespaces();
    
    /* associate the file with the store's resource owner */
    oldowner = t_thrd.utils_cxt.CurrentResourceOwner;
    t_thrd.utils_cxt.CurrentResourceOwner = state->resowner;
    
    state->myfile = BufFileCreateShared(fileset, filename);
    t_thrd.utils_cxt.CurrentResourceOwner = oldowner;
    
    /*
     * For now, be conservative and always use trailing length words for
     * cross-process tuplestores. It's important that the writer and the
     * reader processes agree on this, and forcing it to true is the
     * simplest way to achieve that.
     */
    state->backward = true;
    state->status = TSS_WRITEFILE;
}
 
static void writetup_forbidden(Tuplestorestate *state, void *tup)
{   
    elog(ERROR, "cannot write to tuplestore, it is already frozen");
}
 
/*
 * tuplestore_freeze
 *
 * Flush the current buffer to disk, and forbid further inserts. This
 * prepares the tuplestore for reading from a different process.
 */
void tuplestore_freeze(Tuplestorestate *state)
{
    Assert(state->share_status == TSHARE_WRITER);
    Assert(!state->frozen);
    dumptuples(state);
    BufFileExportShared(state->myfile);
    state->frozen = true;
}
 
/*
 * tuplestore_open_shared
 *
 * Open a shared tuplestore that has been populated in another process
 * for reading.
 */
Tuplestorestate *tuplestore_open_shared(SharedFileSet *fileset, const char *filename)
{
    Tuplestorestate *state;
    int eflags;
 
    eflags = EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND;
 
    state = tuplestore_begin_common(eflags,
                                    false /* interXact, ignored because we open existing files */,
                                    10 /* no need for memory buffers */);
 
    state->backward = true;
 
    state->copytup = copytup_heap;
    state->writetup = writetup_forbidden;
    state->readtup = readtup_heap;
 
    state->myfile = BufFileOpenShared(fileset, filename);
    state->readptrs[0].file = 0;
    state->readptrs[0].offset = 0L;
    state->status = TSS_READFILE;
 
    state->share_status = TSHARE_READER;
    state->frozen = false;
    state->fileset = fileset;
    state->shared_filename = pstrdup(filename);
 
    return state;
}
#endif
